﻿using RabbitMQ.Client;
using RabbitMQ.Client.Events;

using System.Collections.Concurrent;

namespace Lails.MQClient.RabbitMQ.Net6
{
    /// <summary>
    /// 
    /// </summary>
    public class SimpleRabbitMqClient : IDisposable
    {
        /// <summary>
        /// 
        /// </summary>
        ConnectionFactory _factory;
        /// <summary>
        /// 
        /// </summary>
        IConnection? _conn;
        /// <summary>
        /// 
        /// </summary>
        IModel? _channel;

        /// <summary>
        /// 
        /// </summary>
        /// <param name="host"></param>
        /// <param name="port"></param>
        /// <param name="userName"></param>
        /// <param name="password"></param>
        /// <param name="clientName"></param>
        /// <param name="virtualHost"></param>
        public SimpleRabbitMqClient(string host, int port, string userName, string password, string clientName, string virtualHost = "/")
        {
            // 创建一个ConnectionFactory, 并进行配置
            _factory = new ConnectionFactory();
            _factory.HostName = host;
            _factory.Port = port;
            _factory.VirtualHost = virtualHost;
            _factory.UserName = userName;
            _factory.Password = password;
            _factory.AutomaticRecoveryEnabled = true;
            _factory.ClientProvidedName = clientName;
        }

        /// <summary>
        /// 
        /// </summary>
        public void Connect()
        {
            // 通过连接工厂创建连接
            _conn = _factory?.CreateConnection();
            // 通过connection创建一个Channel
            _channel = _conn?.CreateModel();
        }

        /// <summary>
        /// 
        /// </summary>
        public void DisConnect()
        {
            _channel?.Close();
            _conn?.Close();
        }

        /// <summary>
        /// 消息未投递到队列时触发事件绑定计数
        /// </summary>
        long returnEventCount = 0;
        /// <summary>
        /// 添加消息未投递到队列时触发事件
        /// </summary>
        /// <param name="returnCallback"></param>
        public long AddReturnEvent(Action<string, string, byte[]>? returnCallback)
        {
            if (_channel != null && returnCallback != null)
            {
                _channel.BasicReturn += (s, e) =>
                {
                    byte[] data = (!e.Body.IsEmpty && e.Body.Length > 0) ? e.Body.ToArray() : new byte[0];
                    returnCallback?.Invoke(e.Exchange, e.RoutingKey, data);
                };
                return Interlocked.Increment(ref returnEventCount);
            }
            return Interlocked.Read(ref returnEventCount);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="routingKey"></param>
        /// <param name="body"></param>
        /// <param name="exchangeType"></param>
        /// <param name="durable"></param>
        /// <param name="autoDelete"></param>
        /// <param name="mandatory"></param>
        /// <returns></returns>
        public bool Publish(string exchange, string routingKey, byte[] body, string exchangeType = ExchangeType.Topic,
                            bool durable = false, bool autoDelete = true, bool mandatory = false)
        {
            // 创建交换机
            if (!ExistExchange(exchange))
            {
                _channel.ExchangeDeclare(exchange, exchangeType, durable, autoDelete);
                AddExchange(exchange);
            }

            // 参数
            IBasicProperties? pro = null;
            if (durable)
            {
                pro = _channel?.CreateBasicProperties();
                if (pro != null)
                {
                    pro.DeliveryMode = (byte)(durable ? 2 : 1);
                }
            }
            _channel?.BasicPublish(exchange, routingKey, mandatory, pro, body);

            return true;
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="durable"></param>
        /// <param name="callback"></param>
        /// <param name="exchangeType"></param>
        /// <param name="autoDelete"></param>
        /// <returns></returns>
        /// <exception cref="ArgumentNullException"></exception>
        public bool Subscribe(string exchange, string queue, string routingKey, bool durable, Func<byte[], string, string, string, string, bool> callback, string exchangeType = ExchangeType.Topic, bool autoDelete = true)
        {
            if (string.IsNullOrWhiteSpace(queue) ||
               string.IsNullOrWhiteSpace(routingKey))
            {
                throw new ArgumentNullException("queue and topic params is empty!");
            }

            // 创建交换机
            if (!ExistExchange(exchange))
            {
                _channel.ExchangeDeclare(exchange, exchangeType, durable, autoDelete);
                AddExchange(exchange);
            }

            // 创建队列
            var de = $"{exchange}-{queue}";
            if (!ExistQueue(queue))
            {
                _channel?.QueueDeclare(queue, durable, false, autoDelete, null);
                _channel?.QueueBind(queue, exchange, routingKey, null);
                AddQueue(queue);
                _exchange_queue[de] = true;
            }
            else
            {
                // 为队列绑定交换机
                if (!_exchange_queue.ContainsKey(de))
                {
                    _channel?.QueueBind(queue, exchange, routingKey, null);
                    _exchange_queue[de] = true;
                }
            }

            // 创建消费者
            var consumer = new EventingBasicConsumer(_channel);
            consumer.Received += (s, e) =>
            {
                byte[] data = (!e.Body.IsEmpty && e.Body.Length > 0) ? e.Body.ToArray() : new byte[0];
                bool success = callback(data, e.RoutingKey, e.Exchange, e.ConsumerTag, routingKey);
                if (success)
                {
                    _channel?.BasicAck(e.DeliveryTag, false);
                }
            };
            // 绑定队列
            var consumerTag = _channel.BasicConsume(queue, false, consumer);
            return true;
        }

        /// <summary>
        /// 取消订阅
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <returns></returns>
        public bool UnSubscribe(string exchange, string queue, string routingKey)
        {
            if (_channel != null)
            {
                lock (_channel)
                {
                    _channel.QueueUnbind(queue, exchange, routingKey, null);
                    RemoveQueue(queue);
                }
            }
            return true;
        }

        /// <summary>
        /// 
        /// </summary>
        public void Dispose()
        {
            DisConnect();
        }

        #region 辅助

        /// <summary>
        /// 
        /// </summary>
        ConcurrentDictionary<string, bool> _queues = new ConcurrentDictionary<string, bool>();
        /// <summary>
        /// 
        /// </summary>
        ConcurrentDictionary<string, bool> _exchanges = new ConcurrentDictionary<string, bool>();
        /// <summary>
        /// 
        /// </summary>
        ConcurrentDictionary<string, bool> _exchange_queue = new ConcurrentDictionary<string, bool>();

        /// <summary>
        /// 
        /// </summary>
        /// <param name="queue"></param>
        /// <returns></returns>
        private bool ExistQueue(string queue)
        {
            if (string.IsNullOrWhiteSpace(queue)) return true;
            return _queues.ContainsKey(queue);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="queue"></param>
        private void AddQueue(string queue)
        {
            if (!string.IsNullOrWhiteSpace(queue))
            {
                _queues[queue] = true;
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="queue"></param>
        private void RemoveQueue(string queue)
        {
            if (!string.IsNullOrWhiteSpace(queue))
            {
                bool suc;
                _queues.TryRemove(queue, out suc);
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="exchange"></param>
        /// <returns></returns>
        private bool ExistExchange(string exchange)
        {
            if (string.IsNullOrWhiteSpace(exchange)) return true;
            return _exchanges.ContainsKey(exchange);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="exchange"></param>
        private void AddExchange(string exchange)
        {
            if (!string.IsNullOrWhiteSpace(exchange))
            {
                _exchanges[exchange] = true;
            }
        }

        #endregion
    }
}
